package gqueue

import (
	"context"
	"gitee.com/zjlsliupei/ghelp"
	"github.com/beego/beego/v2/core/logs"
	"github.com/go-redis/redis/v8"
	"github.com/tidwall/gjson"
	"strings"
	"time"
)

type QueueRedis struct {
	option string
	rdb    *redis.Client
}

func NewQueueRedis(option string) (*QueueRedis, error) {
	q := QueueRedis{}
	q.option = option
	err := q.Init()
	return &q, err
}

func (r *QueueRedis) Init() error {
	option := gjson.Parse(r.option)
	r.rdb = redis.NewClient(&redis.Options{
		Addr:     option.Get("addr").String(),
		Password: option.Get("password").String(), // no password set
		DB:       int(option.Get("db").Int()),     // use default DB
	})
	return nil
}

func (r *QueueRedis) Publish(queueName, msg string) (string, error) {
	cmd := r.do("LPUSH", r.getRealQueueName(queueName), msg)
	return "don`t need id", cmd.Err()
}

// getRealQueueName 获取实际queueName，如果有prefix会自动拼装上去
func (r *QueueRedis) getRealQueueName(queueName string) string {
	return gjson.Get(r.option, "prefix").String() + queueName
}

func (r *QueueRedis) Subscribe(queueName string, cb func(msg string) bool) {
	go func() {
		for {
			cmd := r.do("BLPOP", r.getRealQueueName(queueName), 5000)
			content, err := cmd.Slice()
			if err != nil {
				// 判断如果是i/o timeout，马上发起请求
				if strings.Index(err.Error(), "i/o timeout") >= 0 {
					continue
				} else {
					logs.Error("BLPOP err", err)
					time.Sleep(3 * time.Second)
					continue
				}
			}
			j := gjson.Parse(ghelp.JsonEncode(content))
			msg := j.Get("1").String()
			if cb(msg) {
				// 执行相关ack操作
			} else {
				// 重试等机制
			}
		}
	}()
}

func (r *QueueRedis) do(args ...interface{}) *redis.Cmd {
	return r.rdb.Do(context.Background(), args...)
}
